//import org.apache.flink.api.common.eventtime.Watermark;
//import org.apache.flink.api.common.eventtime.WatermarkGenerator;
//import org.apache.flink.api.common.eventtime.WatermarkOutput;
//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
//import org.apache.flink.streaming.api.TimeCharacteristic;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.Tumble;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//import org.apache.flink.table.functions.AggregateFunction;
//import org.apache.flink.types.Row;
//
//import java.util.Arrays;
//
//import static org.apache.flink.table.api.Expressions.*;
//
//
//public class OverWindows
//{
//
//    public static void main(String[] args) throws Exception
//    {
//
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//
//
//        DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
//                new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
//                new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
//                new OrderStream(3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
//                new OrderStream(3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
//                new OrderStream(1L, "diaper", 4, 1505528400L),//2017-09-16 10:20:00
//                new OrderStream(1L, "diaper", 5, 1505528400L)//2017-09-16 10:20:00
//        ));
//
//
////-----------------------------------------------------------------------------------------------------
//        Table orders = tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx) -> new WatermarkGenerator<OrderStream>() {
//                    @Override
//                    public void onEvent(OrderStream order, long eventTimestamp, WatermarkOutput watermarkOutput) {
//                        watermarkOutput.emitWatermark(new Watermark(eventTimestamp));
//                    }
//
//                    @Override
//                    public void onPeriodicEmit(WatermarkOutput watermarkOutput)
//                    {
//                    }
//                }))
//                , $("user"), $("product"), $("amount"), $("rowtime").rowtime());
//        System.out.println(orders.getSchema());
//
//
//
//
//
//        AggregateFunction myAggFunc = new MyMinMax();
//        tEnv.registerFunction("myAggFunc", myAggFunc);
//
//
//
//// Distinct aggregation on time window group by(顺利完成)
//
//        Table table = orders
//                .window([OverWindow w].as("w"))           // define over window with alias w
//  .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w"))); // aggregate over the over window w
//
//
//
//
//        Table groupByDistinctResult = orders
//                .window(Tumble.over(lit(5).minutes())
//                        .on($("rowtime"))
//                        .as("w"))
//                .groupBy($("user"),$("product"),$("amount"),$("w"))
//                .aggregate(call("myAggFunc", $("amount")).as("x", "y"))
//                .select($("user"),$("x"),$("y"), $("w").start(), $("w").end());
//
//        tEnv.toRetractStream(groupByDistinctResult, Row.class).print();
//        env.execute();
//
//
//
//
//    }
//
//
//}
//
